#include "config.h"
#include "lsv_wbuffer_internal.h"
#include "lsv_rcache.h"
#include "lsv_bitmap.h"

int lsv_rcache_hash_index_init_i(hash_index_head ** rcache_hash_index, lsv_s32_t hash_table_size){
        int ret = 0;
        lsv_s32_t i = 0;
        hash_index_head * hash_index = NULL;

        hash_index = (hash_index_head *) malloc(sizeof(hash_index_head) * hash_table_size);
        if(NULL == hash_index){
                DERROR("malloc hash index table failed.\n");
                ret = -ENOMEM;
                return ret;
        }
        memset(hash_index, 0, sizeof(hash_index_head) * hash_table_size);
        for(i = 0; i < hash_table_size; i++){
                INIT_LIST_HEAD(&hash_index[i].list);
        }
        *rcache_hash_index = hash_index;

        return ret;
}

/**
 * 1 init hash table for manage wbuffer
 *
 */
int lsv_rcache_hash_index_init(lsv_volume_proto_t *volume_proto){
        int ret = 0;

        lsv_rcache_t * rcache = (lsv_rcache_t *)volume_proto->rcache;

        //page
        if(LSV_RCACHE_PAGE_NUM_MAX > 0){
                lsv_rcache_hash_index_init_i(&rcache->page_hash_index, RCACHE_INDEX_SIZE);
                lsv_rcache_hash_index_init_i(&rcache->page_chunk_hash_index, RCACHE_INDEX_SIZE);
        }
        //chunk
        if(LSV_RCACHE_CHUNK_NUM_MAX > 0)
                lsv_rcache_hash_index_init_i(&rcache->chunk_hash_index, RCACHE_INDEX_SIZE);

        if(LSV_RCACHE_CHUNK_NUM_MAX_L2 > 0){
                lsv_rcache_hash_index_init_i(&rcache->l2_chunk_hash_index, RCACHE_INDEX_SIZE);
                lsv_rcache_ssd_chunk_init(volume_proto);
        }
        return ret;
}

int lsv_rcache_hash_index_destroy(lsv_volume_proto_t *volume_proto){
        int ret = 0;
        lsv_rcache_t * rcache = NULL;

        rcache = (lsv_rcache_t *) volume_proto->rcache;
        if (NULL != rcache->page_hash_index) {
                free(rcache->page_hash_index);
        }

        if (NULL != rcache->page_chunk_hash_index) {
                free(rcache->page_chunk_hash_index);
        }

        if (NULL != rcache->chunk_hash_index) {
                free(rcache->chunk_hash_index);
        }

        if(LSV_RCACHE_CHUNK_NUM_MAX > 0){
                lsv_rcache_ssd_chunk_free(volume_proto);
                if(NULL != rcache->l2_chunk_hash_index){
                        free(rcache->l2_chunk_hash_index);
                }
        }

        return ret;
}

/*当stat_reset为1时，将cu->hit_count和cu->bmap分别重置，即当内存chunk索引插入前，调用此函数，可避免有重复的索引信息*/
int lsv_rcache_chunk_hash_index_regc(lsv_volume_proto_t * volume_proto, lsv_u32_t chunk_id){
        int ret = 0;
        lsv_rcache_t * rcache = (lsv_rcache_t *)volume_proto->rcache;

        if(NULL == rcache->chunk_hash_index){
                DINFO("chunk_hash_index is NULL!\n");
                return 0;
        }

        struct list_head *pos, *next;
        lsv_s32_t hash_key = HASH_KEY1(chunk_id);
        lsv_rcache_chunk_unit *cu = NULL;
#if 0
        /*为保证regc能够不受锁影响，当执行该函数时，首先将对应的cu的invalid_chunk_id置为chunk_id，
         * 当进行lookup操作时，判断cu的chunk_id和invalid_chunk_id是否一致，
         * 若一致，表示该cu已经被回收，但由于锁的左右，暂时还未被真正回收*/
        list_for_each_safe(pos, next, &rcache->chunk_hash_index[hash_key].list){
                cu = list_entry(pos, lsv_rcache_chunk_unit, index_list);
                if(cu->chunk_id == chunk_id){
                        cu->invalid_chunk_id = chunk_id;
                }
        }
#endif
        list_for_each_prev_safe(pos, next, &rcache->chunk_hash_index[hash_key].list){
                cu = list_entry(pos, lsv_rcache_chunk_unit, index_list);
                if (cu->chunk_id == chunk_id) {
                        DINFO("regc chunk_hash_index chunk_id %u cu %p......\n", chunk_id, cu);
                        list_del_init(&cu->index_list);
                        list_del_init(&cu->chunk_lru_hook);
                        lsv_rcache_cu_reset(volume_proto, cu);
                        list_add_tail(&cu->chunk_lru_hook, &rcache->chunk_lru);
                } else {
                        DINFO("regc chunk_hash_index chunk_id %u -> chunk_id: %u cu %p abort regc......\n",
                                        chunk_id, cu->chunk_id, cu);
                }
        }

        return ret;
}

int lsv_rcache_l2_cache_hash_index_regc(lsv_volume_proto_t * volume_proto, lsv_u32_t chunk_id){
        int ret = 0;
        struct list_head *pos, *next;
        lsv_rcache_chunk_l2_unit *l2u = NULL;
        lsv_rcache_t * rcache = (lsv_rcache_t *)volume_proto->rcache;
        lsv_s32_t hash_key = HASH_KEY1(chunk_id);
        if(rcache->l2_chunk_hash_index == NULL){
                DINFO("l2_chunk_hash_index is NULL!\n");
                return 0;
        }
        list_for_each_prev_safe(pos, next, &rcache->l2_chunk_hash_index[hash_key].list){
                l2u = list_entry(pos, lsv_rcache_chunk_l2_unit, index_list);
                if(l2u->chunk_id == chunk_id) {
                        DINFO("l2_cache: regc chunk_id %u  l2u %p\n", chunk_id, l2u);
                        l2u->chunk_id = 0;
                        l2u->invalid_chunk_id = 0;
                        list_del_init(&l2u->index_list);
                        //TODO 回收的l2u是否不需要挪动位置
                        list_del_init(&l2u->l2_lru_hook);
                        list_add_tail(&l2u->l2_lru_hook, &rcache->l2_lru);
                } else {
                        DINFO("l2_cache: regc chunk_id %u -> chunk_id %u abort regc\n", chunk_id, l2u->chunk_id);
                }
        }

        return ret;
}

int lsv_rcache_l2_cache_hash_index_regc_with_mark(lsv_volume_proto_t * volume_proto, lsv_u32_t chunk_id) {
        int ret = 0;
        struct list_head *pos, *next;
        lsv_rcache_chunk_l2_unit *l2u = NULL;
        lsv_rcache_t * rcache = (lsv_rcache_t *)volume_proto->rcache;
        lsv_s32_t hash_key = HASH_KEY1(chunk_id);
        if(rcache->l2_chunk_hash_index == NULL){
                DINFO("l2_chunk_hash_index is NULL!\n");
                return 0;
        }
        list_for_each_prev_safe(pos, next, &rcache->l2_chunk_hash_index[hash_key].list){
                l2u = list_entry(pos, lsv_rcache_chunk_l2_unit, index_list);
                if(l2u->chunk_id == chunk_id) {
                        DINFO("l2_cache: regc chunk_id %u  l2u %p\n", chunk_id, l2u);
                        l2u->invalid_chunk_id = chunk_id;
                }
        }

        return ret;
}

int lsv_rcache_page_hash_index_regc_with_chunkid(lsv_volume_proto_t * volume_proto, lsv_u32_t chunk_id){
        int ret = 0;
        lsv_s32_t hash_key = HASH_KEY1(chunk_id);
        lsv_rcache_page_unit * pu = NULL;
        struct list_head *pos, *next;
        lsv_rcache_t * rcache = (lsv_rcache_t *)volume_proto->rcache;

        if(NULL == rcache->page_chunk_hash_index){
                DINFO("page_chunk_hash_index is NULL\n");
                return 0;
        }

        list_for_each_prev_safe(pos, next, &rcache->page_chunk_hash_index[hash_key].list){
                pu = list_entry(pos, lsv_rcache_page_unit, chunk_list);
                if(pu->chunk_id == chunk_id){
                        DINFO("page_chunk_hash_index: regc chunk_id %u pu %p lba %llu\n", chunk_id, pu,
                                        (LLU)pu->lba);

                        pu->chunk_id = 0;
                        pu->chunk_off = 0;
                        list_del_init(&pu->index_list);
                        list_del_init(&pu->chunk_list);
                        list_del_init(&pu->page_lru_hook);
                        list_add_tail(&pu->page_lru_hook, &rcache->page_lru);
                } else {
                        DINFO("page_chunk_hash_index: regc chunk_id %u pu %p lba %llu -> chunk_id %u abort regc\n",
                                        chunk_id, pu, (LLU)pu->lba, pu->chunk_id);
                }
        }

        return ret;
}

int lsv_rcache_page_hash_index_regc(lsv_volume_proto_t * volume_proto, lsv_u32_t chunk_id, lsv_s32_t chunk_off){
        int ret = 0;
        lsv_s32_t  hash_key = HASH_KEY1(chunk_id);
        lsv_rcache_page_unit * pu = NULL;
        struct list_head *pos, *next;
        lsv_rcache_t * rcache = (lsv_rcache_t *)volume_proto->rcache;

        if(NULL == rcache->page_chunk_hash_index){
                DINFO("page_chunk_hash_index is NULL\n");
                return 0;
        }

        list_for_each_prev_safe(pos, next, &rcache->page_chunk_hash_index[hash_key].list){
                pu = list_entry(pos, lsv_rcache_page_unit, chunk_list);
                if(pu->chunk_id ==chunk_id && pu->chunk_off == chunk_off){
                        DINFO("page_chunk_hash_index: regc chunk_id %u pu %p lba %llu chunk_off %d\n",
                                        chunk_id, pu, (LLU)pu->lba, pu->chunk_off);
                        pu->chunk_id = 0;
                        pu->chunk_off = 0;
                        list_del_init(&pu->index_list);
                        list_del_init(&pu->chunk_list);

                        list_del_init(&pu->page_lru_hook);
                        list_add_tail(&pu->page_lru_hook, &rcache->page_lru);
                }
        }

        return ret;
}

int lsv_rcache_hash_index_regc_all(lsv_volume_proto_t * volume_proto){
        int ret = 0;
        struct list_head *pos, *next;
        lsv_rcache_t * rcache = (lsv_rcache_t *)volume_proto->rcache;

        lsv_rcache_page_unit * pu = NULL;
        lsv_rcache_chunk_unit *cu = NULL;
        lsv_rcache_chunk_l2_unit *l2u = NULL;
        lsv_rcache_iotrace_unit *riu = NULL;

        lsv_wrlock(&rcache->rwlock);
        list_for_each_safe(pos, next, &rcache->page_lru){
                pu = list_entry(pos, lsv_rcache_page_unit, page_lru_hook);
                list_del_init(&pu->index_list);
                list_del_init(&pu->chunk_list);
        }

        list_for_each_safe(pos, next, &rcache->chunk_lru){
                cu = list_entry(pos, lsv_rcache_chunk_unit, chunk_lru_hook);
                list_del_init(&cu->index_list);
                lsv_rcache_cu_reset(volume_proto, cu);
        }

        list_for_each_safe(pos, next, &rcache->l2_lru){
                l2u = list_entry(pos, lsv_rcache_chunk_l2_unit, l2_lru_hook);
                list_del_init(&l2u->index_list);
        }

        list_for_each_safe(pos, next, &rcache->io_list){
                riu = list_entry(pos, lsv_rcache_iotrace_unit, list);
                riu->chunk_id = 0;
        }

        lsv_rwunlock(&rcache->rwlock);

        return ret;
}

int lsv_rcache_page_list_clear(lsv_volume_proto_t *volume_proto, lsv_rcache_page_unit *pu){

        (void) volume_proto;
        list_del_init(&pu->page_lru_hook);
        if(!list_empty(&pu->index_list))
                list_del_init(&pu->index_list);
        if(!list_empty(&pu->chunk_list))
                list_del_init(&pu->chunk_list);
        return 0;
}

int lsv_rcache_page_hash_index_insert(lsv_volume_proto_t *volume_proto, lsv_rcache_page_unit *pu){

        lsv_s32_t hash_key = HASH_KEY(pu->lba);
        lsv_rcache_t * rcache = (lsv_rcache_t *)volume_proto->rcache;
        hash_index_head * hash_index = rcache->page_hash_index;

        list_add(&pu->page_lru_hook, &rcache->page_lru);

        list_add(&pu->index_list, &hash_index[hash_key].list);

        hash_key = HASH_KEY1(pu->chunk_id);
        hash_index = rcache->page_chunk_hash_index;
        list_add(&pu->chunk_list, &hash_index[hash_key].list);

        return 0;
}

int lsv_rcache_page_hash_index_lookup(lsv_volume_proto_t * volume_proto, lsv_u32_t chunk_id,
                lsv_s32_t chunk_off, lsv_u64_t off, lsv_u32_t size, buffer_t *append_buf) {
        int ret = 0;
        lsv_s32_t hash_key = 0;
        lsv_u64_t lba = 0;
        lsv_s32_t page_offset = 0;
        struct list_head *pos, *n;
        lsv_rcache_page_unit * pu = NULL;
        lsv_rcache_t * rcache = (lsv_rcache_t *)volume_proto->rcache;
        hash_index_head * hash_index = rcache->page_hash_index;

        if(NULL ==  hash_index){
                return ret;
        }

        page_offset = off % LSV_PAGE_SIZE;
        lba = OFF2LBA(off);
        hash_key = HASH_KEY(lba);

        list_for_each_safe(pos, n, &hash_index[hash_key].list){
                pu = list_entry(pos, lsv_rcache_page_unit, index_list);
                if(pu->lba == lba) {
                        if (chunk_id != 0 && chunk_off != 0) {
                                YASSERT(pu->chunk_id == chunk_id && pu->chunk_off == chunk_off);
                        }

                        list_del_init(&pu->page_lru_hook);
                        list_add(&pu->page_lru_hook, &rcache->page_lru);

                        mbuffer_appendmem(append_buf, pu->buf + page_offset, size);

                        DINFO("lba %llu crc %u chunk_id %u page %d hash %d pu %p\n",
                                        (LLU)lba,
#ifdef __PAGE_CRC32__
                                        page_crc32(pu->buf),
#else
                                        0,
#endif
                                        pu->chunk_id,
                                        CHUNKOFF2PAGEIDX(pu->chunk_off),
                                        hash_key,
                                        pu);

                        return size;
                }
        }

        return 0;
}

int lsv_rcache_chunk_hash_index_insert(lsv_volume_proto_t *volume_proto, lsv_rcache_chunk_unit *cu){
        lsv_rcache_t * rcache = (lsv_rcache_t *)volume_proto->rcache;
        hash_index_head * hash_index = rcache->chunk_hash_index;
        lsv_s32_t hash_key = HASH_KEY1(cu->chunk_id);

        list_add(&cu->index_list, &hash_index[hash_key].list);

        return 0;
}

int lsv_rcache_chunk_hash_index_lookup(lsv_volume_proto_t * volume_proto, lsv_u32_t chunk_id, lsv_s32_t chunk_off,
                lsv_u64_t off, lsv_s32_t size, buffer_t *append_buf) {
        lsv_s32_t hash_key = 0;
        lsv_s32_t page_idx = 0;
        lsv_u64_t lba = 0;
        lsv_s32_t page_offset = 0;
        struct list_head *pos;
        lsv_rcache_chunk_unit * cu = NULL;
        lsv_rcache_t * rcache = (lsv_rcache_t *)volume_proto->rcache;
        hash_index_head * hash = rcache->chunk_hash_index;

        if(NULL == hash){
                return 0;
        }

        page_offset = off % LSV_WBUF_PAGE_SIZE;
        lba = off - page_offset;
        hash_key = HASH_KEY1(chunk_id);

        list_for_each(pos, &hash[hash_key].list){
                cu = list_entry(pos, lsv_rcache_chunk_unit, index_list);
                if(cu->chunk_id == chunk_id){
                        //if(cu->chunk_id != chunk_id || cu->invalid_chunk_id == chunk_id){
                        if(cu->chunk_id != chunk_id){
                                //DINFO("chunk_id %u cu->chunk_id %u cu->invalid_chunk_id %u!!!!!\n",
                                //      chunk_id, cu->chunk_id, cu->invalid_chunk_id);
                                return 0;
                        }

                        // update LRU
                        list_del_init(&cu->chunk_lru_hook);
                        list_add(&cu->chunk_lru_hook, &rcache->chunk_lru);

                        cu->hit_count ++;
                        page_idx = chunk_off / LSV_PAGE_SIZE;

                        // 设置page_idx对应bit为1, page_idx：[1,256]
                        cu->bmap[page_idx / 64] |= 1 << (page_idx % 64);
                        mbuffer_appendmem(append_buf, cu->buf + chunk_off + page_offset, size);

                        // TODO 如果page hash已满，会引发swap，有什么影响？
                        lsv_rcache_pu_insert(volume_proto, chunk_id, chunk_off, off, size, cu->buf);
#ifdef __PAGE_CRC32__
                        uint32_t crc = page_crc32(cu->buf + chunk_off);
#else
                        uint32_t crc = 0;
#endif
                        DINFO("lba %llu crc %u chunk_id %u page %d size %d hash %d cu %p\n",
                                        (LLU)lba, crc, cu->chunk_id, CHUNKOFF2PAGEIDX(chunk_off),
                                        size, hash_key, cu);

                        return size;
                }
                }

                return 0;
        }

        int lsv_rcache_l2_cache_hash_index_lookup(lsv_volume_proto_t * volume_proto, lsv_u32_t chunk_id, lsv_s32_t chunk_off,
                        lsv_u64_t off, lsv_s32_t size, buffer_t *append_buf) {
                int ret = 0;
                lsv_s32_t hash_key = 0;
                lsv_u64_t lba = 0;
                lsv_s32_t page_offset = 0;
                struct list_head *pos;
                lsv_rcache_chunk_l2_unit * l2u = NULL;
                lsv_rcache_t * rcache = (lsv_rcache_t *)volume_proto->rcache;
                hash_index_head * hash = rcache->l2_chunk_hash_index;
                lsv_s8_t *buf;

                if(NULL == hash){
                        return 0;
                }

                page_offset = off % LSV_WBUF_PAGE_SIZE;
                lba = off - page_offset;

                hash_key = HASH_KEY1(chunk_id);

                list_for_each(pos, &hash[hash_key].list){
                        l2u = list_entry(pos, lsv_rcache_chunk_l2_unit, index_list);
                        if(l2u->chunk_id == chunk_id && l2u->invalid_chunk_id != chunk_id){
                                // TODO load page from ssd cache
                                // 如果LSV_RCACHE_CHUNK_NUM_MAX > 0，则应考虑将l2_cache load入内存中，
                                // 但需要提前知道bitmap相关信息，现在只load page

                                ret = ymalloc((void **)&buf, LSV_PAGE_SIZE);
                                if(unlikely(ret)){
                                        DERROR("malloc for buf, for load page from l2_cache failed, ret : %d\n", ret);
                                        GOTO(ERR1, ret);
                                }
                                // ERR2
                                if(l2u->status == LSV_RCACHE_IN_SSD){
                                        //chunk数据已经写入到ssd中，可以从ssd中读取
                                        DINFO("l2_cache in ssd, ssd_chunk_id %u real_chunk_id %u chunk_off %d\n", l2u->l2_chunk_id, l2u->chunk_id, chunk_off);
                                        ret = lsv_rcache_read_data(volume_proto, buf, l2u->l2_chunk_id, chunk_off, LSV_PAGE_SIZE);
                                }else if(l2u->status == LSV_RCACHE_IN_HDD){
                                        DINFO("l2_cache in hdd, ssd_chunk_id %u real_chunk_id %u chunk_off %d\n", l2u->l2_chunk_id, l2u->chunk_id, chunk_off);
                                        //chunk数据还未写入ssd中，从hdd中读取，不需要预取，chunk正在想ssd中写入。。
                                        ret = lsv_rcache_read_data(volume_proto, buf, l2u->chunk_id, chunk_off, LSV_PAGE_SIZE);
                                }else{
                                        DINFO("swap out into ssd failed!\n");
                                        if(!list_empty(&l2u->index_list))
                                                list_del_init(&l2u->index_list);
                                        goto ERR2;
                                }

                                if(ret != LSV_PAGE_SIZE){
                                        DERROR("call lsv_rcache_read data load page from l2_cache failed, ret: %d\n", ret);
                                        GOTO(ERR2, ret);
                                }
                                //查看l2u是否被回收，若被回收，则返回0
                                if(l2u->chunk_id != chunk_id){
                                        goto ERR2;
                                }

                                //update lru
                                list_del_init(&l2u->l2_lru_hook);
                                list_add(&l2u->l2_lru_hook, &rcache->l2_lru);

                                mbuffer_appendmem(append_buf, buf + page_offset, size);
                                //相对于page来说，chunk_off为0
                                lsv_rcache_pu_insert(volume_proto, l2u->chunk_id, 0, off, size, buf);

                                DINFO("lba %llu hash_key %d log_chunk_id %u l2_cache_chunk_id %u chunk_off %d\n",
                                                (LLU)lba, hash_key, l2u->chunk_id, l2u->l2_chunk_id, chunk_off);
                                yfree((void **)&buf);
                                return size;
                        }
                }

                return -1;
ERR2:
                yfree((void **)&buf);
ERR1:
                if (ret > 0) {
                        ret = -ret;
                }
                return ret;
        }

        int lsv_rcache_hash_index_lookup(lsv_volume_proto_t *volume_proto, lsv_u32_t chunk_id,
                        lsv_s32_t chunk_offset,lsv_u64_t off, lsv_s32_t size, buffer_t *append_buf){
                int ret = 0;
                lsv_rcache_t * rcache = (lsv_rcache_t *)volume_proto->rcache;
                /* 添加lsv_rcache_page_lookup函数后，第一次通过lsv_rcache_page_lookup查找，不许要通过bitmap，若查不到，
                 * 则查询bitmap后，再查询chunk的cache是否已经load，否则，load*/
                lsv_u64_t lba = off - off % LSV_PAGE_SIZE;
                ltable_rdlock(&rcache->lba_ltable, lba);
                DINFO("page lookup chunk_id %u chunk_off: %d page %ju lba %ju size %d\n", chunk_id, chunk_offset, lba, off, size);
                ret = lsv_rcache_page_hash_index_lookup(volume_proto, chunk_id, chunk_offset, off, size, append_buf);
                ltable_unlock(&rcache->lba_ltable, lba);
                if(ret > 0){
                        rcache->page_hit_cnt ++;
                        return ret;
                }

                if(LSV_RCACHE_CHUNK_NUM_MAX > 0) {
                        DINFO("page lookup chunk_id %u chunk_off: %d page %ju lba %ju size %d\n", chunk_id, chunk_offset, lba, off, size);
                        ltable_rdlock(&rcache->chunk_id_ltable, chunk_id);
                        ret = lsv_rcache_chunk_hash_index_lookup(volume_proto, chunk_id, chunk_offset, off, size, append_buf);
                        ltable_unlock(&rcache->chunk_id_ltable, chunk_id);
                        if(ret > 0){
                                rcache->chunk_hit_cnt ++;
                                return ret;
                        }
                }

                if(LSV_RCACHE_CHUNK_NUM_MAX_L2 > 0){
                        DINFO("l2_cache lookup chunk_id %u chunk_off: %d page %ju lba %ju size %d\n", chunk_id, chunk_offset, lba, off, size);
                        //TODO 当LSV_RCACHE_CHUNK_NUM_MAX>0时，l2_cache为cu交换到内存的，此阶段不存在yield过程，否则，l2_cache在load过程中，会yield，需要加锁
                        //TODO 此处锁，可以根据LSV_RCACHE_CHUNK_NUM_MAX的值进行判断是否加锁
                        ltable_rdlock(&rcache->l2_chunk_id_ltable, chunk_id);
                        ret = lsv_rcache_l2_cache_hash_index_lookup(volume_proto, chunk_id, chunk_offset, off, size, append_buf);
                        ltable_unlock(&rcache->l2_chunk_id_ltable, chunk_id);
                        if(ret > 0){
                                rcache->l2_hit_cnt ++;
                                return ret;
                        }
                }
                return 0;
        }
